Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pane info in BigQuery load job id #28272

Closed
wants to merge 5 commits into from

Conversation

Abacn
Copy link
Contributor

@Abacn Abacn commented Aug 31, 2023

Fixes #28219

  • Add randomness to job id

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Abacn
Copy link
Contributor Author

Abacn commented Aug 31, 2023

Run PostCommit_Java_Dataflow

@Abacn
Copy link
Contributor Author

Abacn commented Aug 31, 2023

Run PostCommit_Java_DataflowV2

@Abacn
Copy link
Contributor Author

Abacn commented Aug 31, 2023

There are still multiple code paths relies on the pane info, e.g. write_disposition and create_disposition settings: https://github.com/apache/beam/blob/3ff66d38c41fde475f71254d889ba46440904238/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java#L194C67-L194C67

There might be other issues after fixing this particular issue. Let me think more about it.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Abacn Abacn changed the title Add window to BigQuery load job id Add randomness to BigQuery load job id Sep 1, 2023
@Abacn
Copy link
Contributor Author

Abacn commented Sep 5, 2023

there were test failures due to periodic impulse timing. Re-designed test. If passed then ready to review

c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex());
c.sideInput(jobIdToken), finalTableDestination, -1);

if (isFirstPane) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is another code path relying on pane info. Checked that upstream transform has a GBK and no ReShuffle in between: https://github.com/apache/beam/blob/0ed4c78a799cf5a6cc6a0b40b23ca498096769c5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L404C5-L404C5

so this one should be fine. Nevertheless it's good to add logging to monitor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but we're actually seeing a bug due to relying on the pane index in this code path: #28309

@Abacn
Copy link
Contributor Author

Abacn commented Sep 5, 2023

Run PostCommit_Java_DataflowV2

@Abacn
Copy link
Contributor Author

Abacn commented Sep 5, 2023

Run PostCommit_Java_Dataflow

@Abacn
Copy link
Contributor Author

Abacn commented Sep 5, 2023

Although Dataflow PostCommit have multiple quota exceeded failure, bigquery tests all passed: https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV2_PR/218/testReport/org.apache.beam.sdk.io.gcp.bigquery/

@Abacn
Copy link
Contributor Author

Abacn commented Sep 5, 2023

R: @ahmedabu98

will fix spotless in next iteration

@github-actions
Copy link
Contributor

github-actions bot commented Sep 5, 2023

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

* {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time
* (pipeline starts processing).
*/
public PeriodicImpulse stopAfter(Duration duration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark with @Internal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S. also add to catchUpToNow()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Comment on lines 61 to 63
// add randomness to jobId to avoid conflict
String jobId =
String.format("%s_%s_%s", prefix, destinationHash, randomUUIDString().substring(0, 16));
Copy link
Contributor

@ahmedabu98 ahmedabu98 Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried this may cause duplication of data due to bundle retry.

Let's say a bundle fails during load/copy execution and the BQ job was successful. Beam would process the bundle again and these lines will create a fresh job ID. Under previous circumstances, this job ID is constructed in a deterministic way and would be recognized by BQ as a recently successful job so will be ignored. But now since the ID is always new, BQ will execute the job and we will end up with duplicate data.

return GenerateSequence.from(0)
.to(rowCount)
.withRate(1, Duration.millis(timestampIntervalInMilliseconds));
static class UnboundedStream extends PTransform<PBegin, PCollection<Long>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this fixes the test cases that were supposed to be streaming but weren't.

c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex());
c.sideInput(jobIdToken), finalTableDestination, -1);

if (isFirstPane) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but we're actually seeing a bug due to relying on the pane index in this code path: #28309

@Abacn Abacn changed the title Add randomness to BigQuery load job id Fix pane info in BigQuery load job id Sep 5, 2023
@Abacn
Copy link
Contributor Author

Abacn commented Sep 5, 2023

Run PostCommit_Java_DataflowV2

@Abacn
Copy link
Contributor Author

Abacn commented Sep 6, 2023

dataflow v2 postcommit passed; java precommit known flink test flakiness. PTAL @ahmedabu98

Comment on lines 212 to 222
LOG.info(
"Setup write disposition {}, create disposition {} for first pane BigQuery job {}",
writeDisposition,
createDisposition,
jobIdPrefix);
} else {
LOG.debug("Setup write disposition {}, create disposition {} for BigQuery job {}",
writeDisposition, createDisposition, jobIdPrefix);
LOG.debug(
"Setup write disposition {}, create disposition {} for BigQuery job {}",
writeDisposition,
createDisposition,
jobIdPrefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include the bigquery table destination here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the destination name (without hash) may have PII. In the past I put the logs that may contain PII to debug level so they do not get stored by default (unless override to debug).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(note: final table destination is hased to jobId)

abstract Boolean isFirstPane();

abstract Long getPaneIndex();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we maintaining pane index coming out of write tables? I don't see that we're using it anywhere downstream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will revert the WriteTable.Result changes. Originally it was just to keep WritePartition.Result and WriteTable.Result similar.

return partitionResult.isFirstPane();
}

public long paneIndex() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to preserve the change to PendingJobData, as the pane index info would be helpful to resolve possible racing condition of CREATE_TRUNCATE pending jobs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for debugging purposes? Will note that paneIndex() is still not being called anywhere

@ahmedabu98
Copy link
Contributor

Run PostCommit_Java_Dataflow

@ahmedabu98
Copy link
Contributor

Run PostCommit_Java_DataflowV2

Copy link
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if tests pass

@kennknowles
Copy link
Member

Are you sure this is right? Isn't the purpose of the job id to prevent duplicates?

@reuvenlax
Copy link
Contributor

reuvenlax commented Sep 12, 2023 via email

@kennknowles
Copy link
Member

kennknowles commented Sep 13, 2023

Right. I was, indeed, thinking of reshuffle via random key. But what I'm saying does apply to Reshuffle by assigned keys. I have been a person who always says that Reshuffle is a semantic noop. If we should preserve that we need the v2 and the SDK implementation to record the pane info (aka reify WindowedValue instead of TimestampedValue). This issue has shaken up my beliefs.

If you have Combine -> (stage) -> Reshuffle. Each input to reshuffle has a pane index that corresponds to its key+window in the combine. The main thing is that if you have the original key+window+index you can reassemble the aggregate and/or write it to a sink in a consistent way. But if the stage changes the key you lose that property. Do we even have a good story for what happens to pane info when a DoFn reassigns to new keys?

@kennknowles
Copy link
Member

Notably we only have tests like ReshuffleTest.testReshufflePreservesTimestamps

public void testReshufflePreservesTimestamps() {

Considering that reifying and testing against a full windowed value would have be somewhat of a default thing to do, I would guess the original intent was very aware of this issue.

* Introduce PeriodicImpulse.stopAfter()

* Use it in streaming FILE_LOAD integration test

* Fix unit test assert to consider randomness in jobId
@Abacn
Copy link
Contributor Author

Abacn commented Sep 14, 2023

rebased onto master to test #28312

@Abacn
Copy link
Contributor Author

Abacn commented Sep 14, 2023

Run PostCommit_Java_DataflowV2

@github-actions
Copy link
Contributor

github-actions bot commented Sep 14, 2023

Test Results

  1 269 files  +  1 217    1 269 suites  +1 217   2h 44m 14s ⏱️ - 8h 29m 18s
10 612 tests +10 454  10 544 ✔️ +10 415  68 💤 +39  0 ±0 
10 636 runs  +10 478  10 568 ✔️ +10 439  68 💤 +39  0 ±0 

Results for commit 43f014c. ± Comparison against base commit 220cae7.

This pull request removes 158 and adds 10612 tests. Note that renamed tests count towards both.
org.apache.beam.examples.complete.TfIdfIT ‑ testE2ETfIdf
org.apache.beam.examples.complete.TrafficMaxLaneFlowIT ‑ testE2ETrafficMaxLaneFlow
org.apache.beam.examples.complete.TrafficRoutesIT ‑ testE2ETrafficRoutes
org.apache.beam.examples.cookbook.BigQueryTornadoesIT ‑ testE2EBigQueryTornadoesWithExport
org.apache.beam.examples.cookbook.BigQueryTornadoesIT ‑ testE2EBigQueryTornadoesWithExportUsingQuery
org.apache.beam.examples.cookbook.BigQueryTornadoesIT ‑ testE2eBigQueryTornadoesWithStorageApi
org.apache.beam.examples.cookbook.BigQueryTornadoesIT ‑ testE2eBigQueryTornadoesWithStorageApiUsingQuery
org.apache.beam.examples.cookbook.CombinePerKeyExamplesIT ‑ testE2ECombinePerKey
org.apache.beam.examples.cookbook.DistinctExampleIT ‑ testE2EDistinctExample
org.apache.beam.examples.cookbook.FilterExamplesIT ‑ testE2EFilterExamples
…
DefaultPackageTest ‑ defaultPackageInvoker
org.apache.beam.examples.DebuggingWordCountTest ‑ testDebuggingWordCount
org.apache.beam.examples.MinimalWordCountTest ‑ testMinimalWordCount
org.apache.beam.examples.WindowedWordCountIT ‑ testWindowedWordCountInBatchDynamicSharding
org.apache.beam.examples.WindowedWordCountIT ‑ testWindowedWordCountInBatchStaticSharding
org.apache.beam.examples.WindowedWordCountIT ‑ testWindowedWordCountInStreamingStaticSharding
org.apache.beam.examples.WordCountIT ‑ testE2EWordCount
org.apache.beam.examples.WordCountTest ‑ testCountWords
org.apache.beam.examples.WordCountTest ‑ testExtractWordsFn
org.apache.beam.examples.complete.AutoCompleteTest ‑ testAutoComplete[0]
…
This pull request removes 29 skipped tests and adds 68 skipped tests. Note that renamed tests count towards both.
org.apache.beam.sdk.io.gcp.bigquery.FileLoadsStreamingIT ‑ testDynamicDestinationsWithAutoShardingAndCopyJobs[0]
org.apache.beam.sdk.io.gcp.bigquery.FileLoadsStreamingIT ‑ testDynamicDestinationsWithFixedShards[0]
org.apache.beam.sdk.io.gcp.bigquery.FileLoadsStreamingIT ‑ testLoadWithAutoShardingAndCopyJobs[0]
org.apache.beam.sdk.io.gcp.bigquery.FileLoadsStreamingIT ‑ testLoadWithFixedShards[0]
org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT ‑ testAtLeastOnceWithAutoSchemaUpdate[0]
org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT ‑ testAtLeastOnceWithAutoSchemaUpdate[1]
org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT ‑ testAtLeastOnceWithAutoSchemaUpdate[2]
org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT ‑ testAtLeastOnceWithIgnoreUnknownValues[0]
org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT ‑ testAtLeastOnceWithIgnoreUnknownValues[1]
org.apache.beam.sdk.io.gcp.bigquery.StorageApiSinkSchemaUpdateIT ‑ testAtLeastOnceWithIgnoreUnknownValues[2]
…
org.apache.beam.fn.harness.FnApiDoFnRunnerTest$ExecutionTest ‑ testUsingMetrics
org.apache.beam.runners.core.construction.ReadTranslationTest ‑ testToFromProtoBounded[0: org.apache.beam.sdk.io.CountingSource$UnboundedCountingSource@175cb038]
org.apache.beam.runners.core.construction.ReadTranslationTest ‑ testToFromProtoBounded[3: org.apache.beam.runners.core.construction.ReadTranslationTest$TestUnboundedSource@2ace8c5a]
org.apache.beam.runners.core.construction.ReadTranslationTest ‑ testToFromProtoUnbounded[1: [0, 100)]
org.apache.beam.runners.core.construction.ReadTranslationTest ‑ testToFromProtoUnbounded[2: org.apache.beam.runners.core.construction.ReadTranslationTest$TestBoundedSource@537fd806]
org.apache.beam.runners.dataflow.BatchStatefulParDoOverridesTest ‑ testFnApiMultiOutputOverrideNonCrashing
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest ‑ testMaxThreadMetric[0: [streamingEngine=false]]
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest ‑ testMaxThreadMetric[1: [streamingEngine=true]]
org.apache.beam.runners.dataflow.worker.util.MemoryMonitorTest ‑ uploadJfrProfilesOnThrashing
org.apache.beam.runners.direct.ExecutorServiceParallelExecutorTest ‑ ensureMetricsThreadDoesntLeak
…

♻️ This comment has been updated with latest results.

@Abacn
Copy link
Contributor Author

Abacn commented Sep 14, 2023

Tests now exercised and passed on Dataflow Runner V2: https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV2_PR/226/testReport/org.apache.beam.sdk.io.gcp.bigquery/FileLoadsStreamingIT/

@ahmedabu98
Copy link
Contributor

@reuvenlax as @Abacn mentioned, we broke update-compatibility recently in #28312 so that we can unblock dynamic destinations with copy jobs (#28309). Do we have the green light to merge these changes?
If Dataflow folks decide Runner V2 should preserve pane indices, that support may still take some time to implement. In the meantime, this ensures file loads streaming works on the runner (IIUC it previously never did)

@Abacn
Copy link
Contributor Author

Abacn commented Sep 15, 2023

Run Java PreCommit

@reuvenlax
Copy link
Contributor

reuvenlax commented Sep 17, 2023 via email

@kennknowles
Copy link
Member

Yea let's sync on this. I think I've come around but I want to get it on dev@ in at least a 1 pager for what the spec is. If it turns out to not be dangerous to reinstate, and we can regain the semantic property "reshuffle is a semantic noop" then that is pretty desirable, so that runners can freely add and remove reshuffles.

Noting that here is where Dataflow v1 implemented reshuffle and re-instated the pre-reshuffle pane:

Noting that Beam's reference implementation, the closest thing we have to a spec, does not preserve the pane but will produce a sequence of increasing panes.

@Abacn
Copy link
Contributor Author

Abacn commented Sep 19, 2023

@kennknowles thanks for pin to the related code path for Dataflow legacy worker. I now understand that Dataflow legacy work does some special PTransform override for ReShuffle. For portable pipelines, is that true that there is no longer similar PTransform override, and a possible fix would change java core's ReShuffle to also recover pane (currently it recovers Timestamp) ?

.apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues());

@kennknowles
Copy link
Member

I expect there is a similar override inside the Dataflow service and Unified Worker. The implementation in the SDK would be slow and just for reference, if it was executed. A reshuffle really should not have to cross the Fn API.

@kennknowles
Copy link
Member

But yes if we make the SDK recover the pane info also, then that would make the SDK (direct runner, prism, etc) match Dataflow v1. This would probably not fix v2, but it would fix "the spec".

@kennknowles
Copy link
Member

OK so when I thought about it I agree with everyone else that the reference implementation is wrong and that the right behavior is to be the same as the identity function on PCollections. That means keeping the pane info exactly. I wrote the dev list about it. I think it is probably such a boring thread that it will not get a lot of comment.

c.sideInput(loadJobIdPrefixView),
tableDestination,
partition,
element.getValue().getPaneIndex());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that if we have this:

GBK -> processing -> reshuffle -> this DoFn

Can you tell me which case we are in?

  1. The processing left the key the same so the reshuffle is just for checkpoint
  2. The processing changed the key so that the current element key + pane index is no longer unique

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's a little bit more subtle. It is the same key, but became ShardedKey after processing

After GBK, key is DestinationT
After processing, key is ShardedKey (shard is added in WritePartitions)

@Abacn Abacn marked this pull request as draft October 18, 2023 20:01
Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Dec 18, 2023
Copy link
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Dec 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants